package com.xiaoxin.executor.batch.job.user;

import com.xiaoxin.executor.batch.partitioner.UserPartitioner;
import com.xiaoxin.executor.domain.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class UserJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    //每个分区文件读取
    @Bean
    @StepScope
    public FlatFileItemReader<User> userFlatItemReader(@Value("#{stepExecutionContext['file']}") Resource resource) {
        return new FlatFileItemReaderBuilder<User>()
                .name("userItemReader")
                .resource(resource)
                .delimited().delimiter("#")
                .names("id", "name", "age")
                .targetType(User.class)
                .build();
    }

    @Bean
    public ItemWriter<User> itemWriter() {
        return items -> items.forEach(System.err::println);
    }


    //文件分区器-设置分区规则
    @Bean
    public UserPartitioner userPartitioner() {
        return new UserPartitioner();
    }

    //文件分区处理器-处理分区
    @Bean
    public PartitionHandler userPartitionHandler() {
        TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
        handler.setGridSize(5);
        handler.setTaskExecutor(threadPoolTaskExecutor);
        handler.setStep(userWorkStep());
        try {
            handler.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return handler;
    }

    //每个从分区操作步骤
    @Bean
    public Step userWorkStep() {
        return stepBuilderFactory.get("userWorkStep")
                .<User, User>chunk(10)
                .reader(userFlatItemReader(null))
                .writer(itemWriter())
                .build();
    }

    //主分区操作步骤
    @Bean
    public Step userMasterStep() {
        return stepBuilderFactory.get("userMasterStep")
                .partitioner(userWorkStep().getName(), userPartitioner())
                .partitionHandler(userPartitionHandler())
                .build();
    }


    @Bean
    public Job userPartitionJob() {
        return jobBuilderFactory.get("userPartitionJob")
                .start(userMasterStep())
                .build();
    }
}